Skip to content

一、概述

RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。官网

优势:

  • 使原本需同步完成的操作变成异步执行,降低各模块的耦合性
  • 秒杀服务可使用MQ进行流量削峰,大量用户请求在MQ的消息队列中依次执行

劣势:

  • 系统可用性降低 系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。需要保证高可用
  • 系统复杂度提高 MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。需要保证消息的幂等性,保证不丢失和处理的顺序。
  • 一致性问题 A 系统处理完业务,需要保证多系统处理时的一致性问题。

1.1 相关概念

图片1

图片2

Message消息。消息是不具名的,它由消息头消息体组成。消息体是不透明的,而消息头则由 一系列可选属性组成,这些属性包括:routing-key(路由键)、priority(相对于其他消息的优先 权)、delivery-mode(指出消息可能持久性存储)等。
Publisher消息的生产者。也是一个向交换器发布消息的客户端应用程序。
Consumer消息的消费者。表示一个从消息队列中取得消息的客户端应用程序。
Exchange交换器。用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
三种常用的交换器类型 direct(发布与订阅 完全匹配) 、fanout(广播)、topic(主题,规则匹配)。message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。
Binding绑定。用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息
队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
Queue消息队列。用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一 个消息可投入一个或多个队列。消息一直在队列里面,等待消费者链接到这个队列将其取 走。
Routing-key路由键。RabbitMQ 决定消息该投递到哪个队列的规则。 队列通过路由键绑定到交换器。消息发送到 MQ 服务器时,消息将拥有一个路由键,即便是空的,RabbitMQ 也会将其和绑定使用的路由键进行匹配。如果相匹配,消息将会投递到该队列。 如果不匹配,消息将会进入黑洞。
Connection链接。指 rabbit 服务器和服务建立的 TCP 链接。publisher/consumer 和 broker 之间的 TCP 连接
ChannelChannel 中文叫做信道,是 TCP 里面的虚拟链接。例如:电缆相当于 TCP,信道是一个独立光纤束,一条 TCP 连接上创建多条信道是没有问题的。TCP 一旦打开,就会创建 AMQP 信道。无论是发布消息、接收消息、订阅队列,这些动作都是通过信道完成的。channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
Virtual Host虚拟主机。表示一批交换器,消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有 自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在链接时指定, RabbitMQ 默认的 vhost 是**/**。出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网 络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多 个vhost,每个用户在自己的 vhost 创建 exchange/queue 等
Borker表示消息队列服务器实体。接收和分发消息的应用,RabbitMQ Server就是 Message Broke
交换器和队列的关系交换器是通过路由键和队列绑定在一起的,如果消息拥有的路由键跟队列和交换器的路由键匹配,那么消息就会被路由到该绑定的队列中。
也就是说,消息到队列的过程中,消息首先会经过交换器,接下来交换器在通过路由 键匹配分发消息到具体的队列中。 路由键可以理解为匹配的规则
  • RabbitMQ 为什么需要信道?为什么不是 TCP 直接通信?

    TCP 的创建和销毁开销特别大。创建需要 3 次握手,销毁需要 4 次分手。

    如果不用信道,那应用程序就会以 TCP 链接 Rabbit,高峰时每秒成千上万条链接会造成资源巨大的浪费,而且操作系统每秒处理 TCP 链接数也是有限制的,必定造成性能瓶颈。

    信道的原理是一条线程一条通道,多条线程多条通道同用一条 TCP 链接。一条 TCP链接可以容纳无限的信道,即使每秒成千上万的请求也不会成为性能的瓶颈。

1.2 消息中相关角色

provider消息生产者,就是投递消息的程序
Consumer消息消费者,就是接受消息的程序。
队列队列是生产者和消费者之间的中转站
队列和生产这及消费者之间的关系多个生产者可以将消息发送到同一个队列中,多个消息者也可以只从同一个队列接收数据。

1.3 RabbitMQ运转流程

  • 生产者发送消息
    1. 生产者创建连接(Connection),开启一个信道(Channel),连接到RabbitMQ Broker;
    2. 声明队列并设置属性;如是否排它,是否持久化,是否自动删除;
    3. 将路由键(空字符串)与队列绑定起来;
    4. 发送消息至RabbitMQ Broker;
    5. 关闭信道;
    6. 关闭连接;
  • 消费者接收消息
    1. 消费者创建连接(Connection),开启一个信道(Channel),连接到RabbitMQ Broker
    2. 向Broker 请求消费相应队列中的消息,设置相应的回调函数;
    3. 等待Broker回应闭关投递响应队列中的消息,消费者接收消息;
    4. 确认(ack,自动确认)接收到的消息;
    5. RabbitMQ从队列中删除相应已经被确认的消息;
    6. 关闭信道;
    7. 关闭连接;

生产者流转过程说明

  1. 客户端与代理服务器Broker建立连接。会调用newConnection() 方法,这个方法会进一步封装Protocol Header 0-9-1 的报文头发送给Broker ,以此通知Broker 本次交互采用的是AMQPO-9-1 协议,紧接着Broker 返回Connection.Start 来建立连接,在连接的过程中涉及Connection.Start/.Start-OK 、Connection.Tune/.Tune-Ok ,Connection.Open/ .Open-Ok 这6 个命令的交互。
  2. 客户端调用connection.createChannel方法。此方法开启信道,其包装的channel.open命令发送给Broker,等待channel.basicPublish方法,对应的AMQP命令为Basic.Publish,这个命令包含了content Header 和content Body()。content Header 包含了消息体的属性,例如:投递模式,优先级等,content Body 包含了消息体本身。
  3. 客户端发送完消息需要关闭资源时,涉及到Channel.Close和Channl.Close-Ok 与Connetion.Close和Connection.Close-Ok的命令交互。

消费者流转过程说明

  1. 消费者客户端与代理服务器Broker建立连接。会调用newConnection() 方法,这个方法会进一步封装Protocol Header 0-9-1 的报文头发送给Broker ,以此通知Broker 本次交互采用的是AMQPO-9-1 协议,紧接着Broker 返回Connection.Start 来建立连接,在连接的过程中涉及Connection.Start/.Start-OK 、Connection.Tune/.Tune-Ok ,Connection.Open/ .Open-Ok 这6 个命令的交互。
  2. 消费者客户端调用connection.createChannel方法。和生产者客户端一样,协议涉及Channel . Open/Open-Ok命令。
  3. 在真正消费之前,消费者客户端需要向Broker 发送Basic.Consume 命令(即调用channel.basicConsume 方法〉将Channel 置为接收模式,之后Broker 回执Basic . Consume - Ok 以告诉消费者客户端准备好消费消息。
  4. Broker 向消费者客户端推送(Push) 消息,即Basic.Deliver 命令,这个命令和Basic.Publish 命令一样会携带Content Header 和Content Body。
  5. 消费者接收到消息并正确消费之后,向Broker 发送确认,即Basic.Ack 命令。
  6. 客户端发送完消息需要关闭资源时,涉及到Channel.Close和Channl.Close-Ok 与Connetion.Close和Connection.Close-Ok的命令交互。

1.4 常用命令

sh
service rabbitmq-server start # 启动服务
service rabbitmq-server stop # 停止服务
service rabbitmq-server restart # 重启服务

# 查看队列


<NolebasePageProperties />




rabbitmqctl list_queues
# 查看exchanges
rabbitmqctl list_exchanges
# 查看用户
rabbitmqctl list_users
# 查看连接
rabbitmqctl list_connections
# 查看消费者信息
rabbitmqctl list_consumers
# 查看环境变量
rabbitmqctl environment
# 查看未被确认的队列
rabbitmqctl list_queues name messages_unacknowledged
# 查看单个队列的内存使用
rabbitmqctl list_queues name memory
# 查看准备就绪的队列
rabbitmqctl list_queues name messages_ready

1.5 日志

RabbitMQ默认日志存放路径: /var/log/rabbitmq/rabbit@xxx.log

日志包含了RabbitMQ的版本号、Erlang的版本号、RabbitMQ服务节点名称、cookie的hash值、 RabbitMQ配置文件地址、内存限制、磁盘限制、默认账户guest的创建以及权限配置等等

二、基本使用

2.1 工具类

配置类

java

import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.IOException;
import java.io.UnsupportedEncodingException;

/**
 * MQ配置类
 * @author Brack.zhu
 * @date 2020/4/21
 */
@Configuration
public class AmqpConfig {

    /**
     * 自定义MQ消息转换
     * @author sunwen
     * @since 2019/10/25
     */
    @Bean
    public MessageConverter messageConverter() {
        return new RabbitMqFastJsonConverter();
    }



    /**
     * RabbitMq消息对象转换改用FastJson
     * @author sunwen
     * @since 2019/10/25
     */
    private static class RabbitMqFastJsonConverter extends AbstractMessageConverter {

        private Logger logger = LoggerFactory.getLogger(RabbitMqFastJsonConverter.class);
        private static ClassMapper classMapper = new RabbitMqFastJsonClassMapper();
        private static String DEFAULT_CHART_SET = "UTF-8";

        @Override
        protected Message createMessage(Object o, MessageProperties messageProperties) {
            byte[] bytes;
            try {
                String jsonString = JSON.toJSONString(o);
                bytes = jsonString.getBytes(DEFAULT_CHART_SET);
            } catch (IOException e) {
                throw new MessageConversionException("Failed to convert Message content", e);
            }
            messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
            messageProperties.setContentEncoding(DEFAULT_CHART_SET);
            messageProperties.setContentLength(bytes.length);
            classMapper.fromClass(o.getClass(), messageProperties);
            return new Message(bytes, messageProperties);
        }

        @Override
        public Object fromMessage(Message message) throws MessageConversionException {
            Object content = null;
            MessageProperties properties = message.getMessageProperties();
            if (properties != null) {
                String contentType = properties.getContentType();
                if (contentType != null && contentType.contains("json")) {
                    String encoding = properties.getContentEncoding();
                    if (encoding == null) {
                        encoding = DEFAULT_CHART_SET;
                    }
                    Class<?> targetClass = classMapper.toClass(message.getMessageProperties());
                    try {
                        content = convertBytesToObject(message.getBody(), encoding, targetClass);
                    } catch (UnsupportedEncodingException e) {
                        throw new MessageConversionException("Failed to convert Message content", e);
                    }
                } else {
                    logger.warn("Could not convert incoming message with content-type [" + contentType + "]");
                }
            }
            if (content == null) {
                content = message.getBody();
            }
            return content;
        }

        private Object convertBytesToObject(byte[] body, String encoding, Class<?> clazz) throws UnsupportedEncodingException {
            String contentAsString = new String(body, encoding);
            return JSON.parseObject(contentAsString, clazz);
        }
    }

    public static class RabbitMqFastJsonClassMapper extends DefaultClassMapper {
        public RabbitMqFastJsonClassMapper() {
            super();
            setTrustedPackages("*");
        }
    }


}

工具类

java

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

/**
 * Amqp 配置
 * @author sunwen
 * @since 2019/4/3
 */
@Component
public class AmqpHelper {

    @Value("${mqc.exchange}")
    private String defaultExchangeName;
    @Autowired
    private AmqpTemplate amqpTemplate;

//    @Bean
//    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
//        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
//        rabbitAdmin.setAutoStartup(true);
//        return rabbitAdmin;
//    }

    /**
     * 微服务默认消息交换器
     * @author sunwen
     * @since 2019/10/25
     */
    @Bean
    public TopicExchange defaultExchange() {
        return new TopicExchange(defaultExchangeName);
    }

    /**
     * 发送消息到默认exchange
     * 发布/订阅模式,根据routingKey规则指定订阅类型(参考RabbitMQ的topic模式)
     * @author sunwen
     * @since 2019/10/25
     */
    public void sendToExchange(String routingKey, Object message){
        amqpTemplate.convertAndSend(defaultExchangeName, routingKey, message);
    }

    /**
     * 发送消息到指定队列
     * 生产者消费者模式,一个队列可由1个或多个消费者同时消费(参考RabbitMQ的work模式)
     * @author sunwen
     * @since 2019/10/26
     */
    public void sendToQueue(String queueName, Object message){
        amqpTemplate.convertAndSend(queueName, message);
    }

}

2.2 原生入门demo

依赖

xml
 <dependency>
     <groupId>com.rabbitmq</groupId>
     <artifactId>amqp-client</artifactId>
     <version>5.6.0</version>
</dependency>

生产者

java
// 生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class PublisherTest {
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.150.101");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("itcast");
        factory.setPassword("123321");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        /**
         * 参数1:队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接
         * 参数4:是否在不使用的时候自动删除队列
         * 参数5:队列其它参数
         */
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.发送消息
        String message = "hello, rabbitmq!";
        /**
         * 参数1:交换机名称,如果没有指定则使用默认Default Exchage
         * 参数2:路由key,简单模式可以传递队列名称
         * 参数3:消息其它属性
         * 参数4:消息内容
         */
        channel.basicPublish("", queueName, null, message.getBytes());
        System.out.println("发送消息成功:【" + message + "】");

        // 5.关闭通道和连接
        channel.close();
        connection.close();

    }
}

消费者

java
// 消费者
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConsumerTest {

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.150.101");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("itcast");
        factory.setPassword("123321");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        /**
         * 参数1:队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接
         * 参数4:是否在不使用的时候自动删除队列
         * 参数5:队列其它参数
         */
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.订阅消息
        /**
         * 参数1:队列名称
         * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
         * 参数3:消息接收到后回调
         */
        channel.basicConsume(queueName, true, new DefaultConsumer(channel){
            @Override
            /**
             * consumerTag 消息者标签,在channel.basicConsume时候可以指定
             * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
             * properties 属性信息
             * body 消息
             */
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 5.处理消息
                String message = new String(body);
                System.out.println("接收到消息:【" + message + "】");
            }
        });
        System.out.println("等待接收消息。。。。");
    }
}

2.3 amqp入门demo

依赖

xml
<dependency> 
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置

properties
spring.application.name=springcloud-mq 
spring.rabbitmq.host=192.168.70.131
spring.rabbitmq.port=5672 
spring.rabbitmq.username=oldlu 
spring.rabbitmq.password=123456

创建队列

java
/**
* 创建消息队列 
* @author Administrator 
*
*/
@Configuration
public class QueueConfig {
    
    /*** 交换机配置
    * ExchangeBuilder提供了fanout、direct、topic、header交换机类型的配置 
    * @return the exchange 
    */ 
    @Bean(EXCHANGE_TOPICS_INFORM) 
    public Exchange EXCHANGE_TOPICS_INFORM() { 
        //durable(true)持久化,消息队列重启后交换机仍然存在 return 
        ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build(); 
    }
    /**
    * 创建队列 
    * @return 
    */ 
    @Bean 
    public Queue createQueue(){
        return new Queue("hello-queue"); 
    } 
    
    /** channel.queueBind(INFORM_QUEUE_SMS,"inform_exchange_topic","inform.#.sms.#"); 
    * 绑定队列到交换机 . 
    *
    * @param queue the queue 
    * @param exchange the exchange 
    * @return the binding 
    */
    @Bean 
    public Binding BINDING_QUEUE_INFORM_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue,
                                            @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) { 
        return BindingBuilder.bind(queue).to(exchange)
            .with("inform.#.sms.#").noargs();
    }
    
}

生产者

/**
* 消息发送者 
* @author Administrator
*
*/
@Component
public class Sender {
    @Autowired 
    private AmqpTemplate rabbitAmqpTemplate;
    /*
    * 发送消息的方法 
    */ 
    public void send(String msg){
        //向消息队列发送消息
        //参数一:队列的名称。
        //参数二:消息 
        this.rabbitAmqpTemplate.convertAndSend("hello-queue", msg); 
        rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM,"inform.sms.email",message);
    }
}

消费者

java
/**
* 消息接收者 
* @author Administrator 
*
*/
@Component
public class Receiver { 
    /**
    * 接收消息的方法。采用消息队列监听机制 
    * @param msg 
    */ 
    @RabbitListener(queues="hello-queue") 
    public void process(String msg){
        System.out.println("receiver: "+msg);
    } 
}

三种匹配方式案例

  • 配置
properties
spring.application.name=springcloud-mq
spring.rabbitmq.host=192.168.70.131 spring.rabbitmq.port=5672 
spring.rabbitmq.username=oldlu spring.rabbitmq.password=123456 
#设置交换器的名称 
mq.config.exchange=log.direct 
#info 队列名称 
mq.config.queue.info=log.info
#info 路由键 
mq.config.queue.info.routing.key=log.info.routing.key 
#error 队列名称 
mq.config.queue.error=log.error 
#error 路由键 
mq.config.queue.error.routing.key=log.error.routing.key
  • 生产者

    java
    /**
    * 消息发送者 
    * @author Administrator 
    *
    */ 
    @Component 
    public class Sender {
        @Autowired 
        private AmqpTemplate rabbitAmqpTemplate; 
        //exchange 交换器名称 
        @Value("${mq.config.exchange}") 
        private String exchange; 
        //routingkey 路由键
        @Value("${mq.config.queue.error.routing.key}") 
        private String routingkey;
        /*
        * 发送消息的方法 
        */ 
        public void send(String msg){
            //向消息队列发送消息 
            //参数一:交换器名称。 
            //参数二:路由键 
            //参数三:消息
            this.rabbitAmqpTemplate.convertAndSend(this.exchange, this.routingkey, msg);
        } 
    }
  • 消费者

    java
    /**
    * 消息接收者 
    * @author Administrator 
    * @RabbitListener bindings:绑定队列 
    * @QueueBinding value:绑定队列的名称  名称不同队列不同
    * exchange:配置交换器 
    * key:取全名称时为完全匹配,为"*.log.*"为规则匹配,满足则接收 
    *
    * @Queue value:配置队列名称 
    * autoDelete:是否是一个可删除的临时队列 
    *
    * @Exchange value:为交换器起个名称 
    * type:指定具体的交换器类型  当为规则匹配时类型为type=ExchangeTypes.TOPIC
    *
    *
    */ 
    /**
    *完全匹配时: QueueBinding key为全名称 @Exchange type为ExchangeTypes.DIRECT
    * 只有key值与${mq.config.queue.info.routing.key}值相等的才会被接收
    * key可以为数组 {"red", "blue"}
    */
    @Component 
    @RabbitListener( bindings=@QueueBinding(
        value=@Queue(value="${mq.config.queue.info}",
                     autoDelete="true"),
        exchange=@Exchange(value="${mq.config.exchange}",
                           type=ExchangeTypes.DIRECT), 
        key="${mq.config.queue.info.routing.key}" ) ) 
    public class InfoReceiver { 
        /**
        * 接收消息的方法。采用消息队列监听机制
        * @param msg 
        */ 
        @RabbitHandler 
        public void process(String msg){
            System.out.println("Info........receiver: "+msg);
        } 
    }
    
    /**
    *
    *规则匹配时:QueueBinding key为匹配规则,@Exchange type为ExchangeTypes.TOPIC
    * Topic交换机接收的消息RoutingKey必须是多个单词,以 `.` 分割
    * Topic交换机与队列绑定时的bindingKey可以指定通配符 `#`:代表0个或多个词  `*`:代表1个词
    * `item.#`:能够匹配`item.spu.insert` 或者 `item.spu`
    * `item.*`:只能匹配`item.spu`
    */
    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "topic.queue1"),
        exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
        key = "china.#"
    ))
    public void listenTopicQueue1(String msg){
        System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
    }
    
    /**
    *广播时:不需要配置key值,发送值时路由键为空字符串, @Exchange type=Excha ngeTypes.FANOUT
    * FanoutExchange的会将消息路由到每个绑定的队列
    */
    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String msg) {
        System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
    }

三、核心知识

3.1 消息可靠性

消息从发送,到消费者接收,会经理多个过程:

其中的每一步都可能导致消息丢失,常见的丢失原因包括:

  • 发送时丢失:
    • 生产者发送的消息未送达exchange
    • 消息到达exchange后未到达queue
  • MQ宕机,queue将消息丢失
  • consumer接收到消息后未消费就宕机

针对这些问题,RabbitMQ分别给出了解决方案:

  • 生产者确认机制
  • mq持久化
  • 消费者确认机制
  • 失败重试机制

可靠性保证方式:

  • 开启生产者确认机制,确保生产者的消息能到达队列
  • 开启持久化功能,确保消息未消费前在队列中不会丢失
  • 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack
  • 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理
1. 生产消息确认

RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。这种机制必须给每个消息指定一个唯一ID。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。

返回结果有两种方式:

  • publisher-confirm,发送者确认
    • 消息成功投递到交换机,返回ack
    • 消息未投递到交换机,返回nack
  • publisher-return,发送者回执
    • 消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。

首先,修改publisher服务中的application.yml文件,添加下面的内容:

yaml
spring:
  rabbitmq:
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true

说明:

  • publish-confirm-type:开启publisher-confirm,这里支持两种类型:
    • simple:同步等待confirm结果,直到超时
    • correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
  • publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
  • template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息

定义Return回调

每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目加载时配置:

修改publisher服务,添加一个:

java
package cn.itcast.mq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 获取RabbitTemplate
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 设置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            // 投递失败,记录日志
            log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
                     replyCode, replyText, exchange, routingKey, message.toString());
            // 如果有业务需要,可以重发消息
        });
    }
}

定义ConfirmCallback

ConfirmCallback可以在发送消息时指定,因为每个业务处理confirm成功或失败的逻辑不一定相同。

在publisher服务的cn.itcast.mq.spring.SpringAmqpTest类中,定义一个单元测试方法:

java
public void testSendMessage2SimpleQueue() throws InterruptedException {
    // 1.消息体
    String message = "hello, spring amqp!";
    // 2.全局唯一的消息ID,需要封装到CorrelationData中
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    // 3.添加callback
    correlationData.getFuture().addCallback(
        result -> {
            if(result.isAck()){
                // 3.1.ack,消息成功
                log.debug("消息发送成功, ID:{}", correlationData.getId());
            }else{
                // 3.2.nack,消息失败
                log.error("消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason());
            }
        },
        ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage())
    );
    // 4.发送消息
    rabbitTemplate.convertAndSend("task.direct", "task", message, correlationData);

    // 休眠一会儿,等待ack回执
    Thread.sleep(2000);
}
2. 消息持久化

生产者确认可以确保消息投递到RabbitMQ的队列中,但是消息发送到RabbitMQ以后,如果突然宕机,也可能导致消息丢失。

要想确保消息在RabbitMQ中安全保存,必须开启消息持久化机制。

  • 交换机持久化
  • 队列持久化
  • 消息持久化

交换机持久化

RabbitMQ中交换机默认是非持久化的,mq重启后就丢失。

SpringAMQP中可以通过代码指定交换机持久化:

java
@Bean
public DirectExchange simpleExchange(){
    // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
    return new DirectExchange("simple.direct", true, false);
}

事实上,默认情况下,由SpringAMQP声明的交换机都是持久化的。

可以在RabbitMQ控制台看到持久化的交换机都会带上D的标示:

队列持久化

RabbitMQ中队列默认是非持久化的,mq重启后就丢失。

SpringAMQP中可以通过代码指定交换机持久化:

java
@Bean
public Queue simpleQueue(){
    // 使用QueueBuilder构建队列,durable就是持久化的
    return QueueBuilder.durable("simple.queue").build();
}

事实上,默认情况下,由SpringAMQP声明的队列都是持久化的。

可以在RabbitMQ控制台看到持久化的队列都会带上D的标示:

消息持久化

利用SpringAMQP发送消息时,可以设置消息的属性(MessageProperties),指定delivery-mode:

  • 1:非持久化
  • 2:持久化

默认情况下,SpringAMQP发出的任何消息都是持久化的

3. 消费者消息确认

RabbitMQ是阅后即焚机制,RabbitMQ确认消息被消费者消费后会立刻删除。

而RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的:消费者获取消息后,应该向RabbitMQ发送ACK回执,表明自己已经处理消息。

SpringAMQP则允许配置三种确认模式:

•manual:手动ack,需要在业务代码结束后,调用api发送ack。则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则 调用channel.basicNack()方法,让其自动重新发送消息

•auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack

•none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除

由此可知:

  • none模式下,消息投递是不可靠的,可能丢失
  • auto模式类似事务机制,出现异常时返回nack,消息回滚到mq;没有异常,返回ack
  • manual:自己根据业务情况,判断什么时候该ack

一般,我们都是使用默认的auto即可。

autoDelete 属性

  • @Queue: 当所有消费客户端连接断开后,是否自动删除

    队列 true:删除 false:不删除

  • @Exchange:当所有绑定队列都不在使用时,是否自动

    删除交换器 true:删除 false:不删除

消息确认ACK

图片3

4. 消息消费失败重试机制

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力:

可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。本地重试达到最大次数后,Spring会返回ack,消息会被丢弃

yml
# 开启本地重试
spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000 # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
5. 失败策略

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

java
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;

@Configuration
public class ErrorMessageConfig {
    // 失败消息的交换机
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }
    // 失败消息的队列
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue", true);
    }
    // 交换机绑定
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    }
	// RepublishMessageRecoverer,关联队列和交换机
    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}
6. 事务支持

务处理伴随消息的发送,业务处理失败(事务回滚)后要求消息不发送。rabbitmq 使用调用者的外部事务,通常是首选,因为它是非侵入性的(低耦合)。

外部事务的配置:spring-rabbitmq-producer\src\main\resources\spring\spring-rabbitmq.xml

xml
<!-- channel-transacted="true" 表示:支持事务操作 -->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"
       confirm-callback="confirmCallback" return-callback="sendReturnCallback"
       channel-transacted="true" />

<!--平台事务管理器-->
<bean id="transactionManager" class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
        <property name="connectionFactory" ref="connectionFactory"/>
</bean>
java
@Transactional
public class ProducerTest;
    
@Test
public void queueTest2(){
    //路由键与队列同名
    rabbitTemplate.convertAndSend("spring_queue", "只发队列spring_queue的消息--01。");
    System.out.println("----------------dosoming:可以是数据库的操作,也可以是其他业务类型的操作---------------");
    //模拟业务处理失败
    System.out.println(1/0);
    rabbitTemplate.convertAndSend("spring_queue", "只发队列spring_queue的消息--02。");
}

3.2 死信交换机

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息满了,无法投递

一个队列中配置dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,这个交换机称为死信交换机(Dead Letter Exchange,检查DLX)。

利用死信交换机收集所有消费者处理失败的消息(死信),交由人工处理,进一步提高消息队列的可靠性。

1. 队列绑定死信交换机

配置方式

xml
 <!--定义定向交换机中的持久化死信队列,不存在则自动创建-->
    <rabbit:queue id="my_dlx_queue" name="my_dlx_queue" auto-declare="true"/>

    <!--定义广播类型交换机;并绑定上述两个队列-->
    <rabbit:direct-exchange id="my_dlx_exchange" name="my_dlx_exchange" auto-declare="true">
        <rabbit:bindings>
            <!--绑定路由键my_ttl_dlx、my_max_dlx,可以将过期的消息转移到my_dlx_queue队列-->
            <rabbit:binding key="my_ttl_dlx" queue="my_dlx_queue"/>
            <rabbit:binding key="my_max_dlx" queue="my_dlx_queue"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>

 <!--定义过期队列及其属性,不存在则自动创建-->
    <rabbit:queue id="my_ttl_dlx_queue" name="my_ttl_dlx_queue" auto-declare="true">
        <rabbit:queue-arguments>
            <!--投递到该队列的消息如果没有消费都将在6秒之后被投递到死信交换机-->
            <entry key="x-message-ttl" value-type="long" value="6000"/>
            <!--设置当消息过期后投递到对应的死信交换机-->
            <entry key="x-dead-letter-exchange" value="my_dlx_exchange"/>
        </rabbit:queue-arguments>
    </rabbit:queue>

    <!--定义限制长度的队列及其属性,不存在则自动创建-->
    <rabbit:queue id="my_max_dlx_queue" name="my_max_dlx_queue" auto-declare="true">
        <rabbit:queue-arguments>
            <!--投递到该队列的消息最多2个消息,如果超过则最早的消息被删除投递到死信交换机-->
            <entry key="x-max-length" value-type="long" value="2"/>
            <!--设置当消息过期后投递到对应的死信交换机-->
            <entry key="x-dead-letter-exchange" value="my_dlx_exchange"/>
        </rabbit:queue-arguments>
    </rabbit:queue>

    <!--定义定向交换机 根据不同的路由key投递消息-->
    <rabbit:direct-exchange id="my_normal_exchange" name="my_normal_exchange" auto-declare="true">
        <rabbit:bindings>
            <rabbit:binding key="my_ttl_dlx" queue="my_ttl_dlx_queue"/>
            <rabbit:binding key="my_max_dlx" queue="my_max_dlx_queue"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>
java
// 声明普通的 simple.queue队列,并且为其指定死信交换机:dl.direct
@Bean
public Queue simpleQueue2(){
    return QueueBuilder.durable("simple.queue") // 指定队列名称,并持久化
        .deadLetterExchange("dl.direct") // 指定死信交换机
        .build();
}
// 声明死信交换机 dl.direct
@Bean
public DirectExchange dlExchange(){
    return new DirectExchange("dl.direct", true, false);
}
// 声明存储死信的队列 dl.queue
@Bean
public Queue dlQueue(){
    return new Queue("dl.queue", true);
}
// 将死信队列 与 死信交换机绑定
@Bean
public Binding dlBinding(){
    return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("simple");
}
2. TTL(超时时间)

过期时间TTL表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除。RabbitMQ可以对消息和队列设置TTL。目前有两种方法可以设置。

一个队列中的消息如果超时未消费,则会变为死信,超时分为两种情况:

  • 消息所在的队列设置了超时时间
  • 消息本身设置了超时时间

如果不设置TTL,则表示此消息不会过期。如果将TTL设置为0,则表示除非此时可以直接将消息投递到消费者,否则该消息会被立即丢弃。

配置方式

xml
 <!--定义过期队列及其属性,不存在则自动创建-->
<!--参数 x-message-ttl 的值 必须是非负 32 位整数 (0 <= n <= 2^32-1) ,以毫秒为单位表示 TTL 的值。这样,值 6000 表示存在于 队列 中的当前 消息 将最多只存活 6 秒钟。-->
    <rabbit:queue id="my_ttl_queue" name="my_ttl_queue" auto-declare="true">
        <rabbit:queue-arguments>
            <!--投递到该队列的消息如果没有消费都将在6秒之后被删除-->
            <entry key="x-message-ttl" value-type="long" value="6000"/>
        </rabbit:queue-arguments>
    </rabbit:queue>
java
// 队列设置超时时间
@Bean
public Queue ttlQueue(){
    return QueueBuilder.durable("ttl.queue") // 指定队列名称,并持久化
        .ttl(10000) // 设置队列的超时时间,10秒
        .deadLetterExchange("dl.ttl.direct") // 指定死信交换机
        .build();
}

// 消息设置超时时间
public void testTTLMsg() {
    // 创建消息
    Message message = MessageBuilder
        .withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8))
        // expiration 字段以微秒为单位表示 TTL 值
        .setExpiration("5000")
        .build();
    // 消息ID,需要封装到CorrelationData中
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    // 发送消息
    rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
    log.debug("发送消息成功");
}

3.3 延迟队列

利用TTL结合死信交换机,我们实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列(Delay Queue)模式。

延迟队列的使用场景包括:

  • 延迟发送短信
  • 用户下单,如果用户在15 分钟内未支付,则自动取消
  • 预约工作会议,20分钟后自动通知所有参会人员

因为延迟队列的需求非常多,所以RabbitMQ的官方也推出了一个插件,原生支持延迟队列效果。

这个插件就是DelayExchange插件。参考RabbitMQ的插件列表页面:https://www.rabbitmq.com/community-plugins.html

使用方式可以参考官网地址:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq

1. 安装

RabbitMQ有一个官方的插件社区,地址为:https://www.rabbitmq.com/community-plugins.html

通过docker安装

为mq创建数据卷

docker volume inspect mq-plugins

将插件上传到此目录

shell
# 进入docker
docker exec -it mq bash
# 开启插件
rabbitmq-plugins enable 
rabbitmq_delayed_message_exchange
2. 原理

DelayExchange需要将一个交换机声明为delayed类型。当我们发送消息到delayExchange时,流程如下:

  • 接收消息
  • 判断消息是否具备x-delay属性
  • 如果有x-delay属性,说明是延迟消息,持久化到硬盘,读取x-delay值,作为延迟时间
  • 返回routing not found结果给消息发送者
  • x-delay时间到期后,重新投递消息到指定队列
3. 使用
java
// 基于注解方式
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "delayed.queue", durable = "true"),
    exchange = @Exchange(name = "delayed.direct", delayed = "true"),
    key = "delayed"
))

// 基于bean
@Bean
public Queue directQueue(){
    return QueueBuilder.durable("direct.queue") // 指定队列名称,并持久化
        .ttl(10000) // 设置队列的超时时间,10秒
        .delayed()	// 设置延迟
        .build();
}

// 发送消息
// 创建消息
    Message message = MessageBuilder
        .withBody("hello, direct message".getBytes(StandardCharsets.UTF_8))
        .setHeader("x-delay",10000)
        .build();
    // 消息ID,需要封装到CorrelationData中
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    // 发送消息
    rabbitTemplate.convertAndSend("direct.direct", "direct", message, correlationData);

3.4 惰性队列

惰性队列的优点

  • 基于磁盘存储,消息上限高
  • 没有间歇性的page-out,性能比较稳定

惰性队列的缺点

  • 基于磁盘存储,消息时效性会降低
  • 性能受限于磁盘的IO
1. 消息堆积问题

当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。之后发送的消息就会成为死信,可能会被丢弃,这就是消息堆积问题。

  • 消息堆积的影响

    • 可能导致新消息无法进入队列
    • 可能导致旧消息无法丢失
    • 消息等待消费的时间过长,超出了业务容忍范围。
  • 产生堆积的情况

    • 生产者突然大量发布消息
    • 消费者消费失败
    • 消费者出现性能瓶颈。
    • 消费者挂掉
  • 解决办法

    • 排查消费者的消费性能瓶颈
    • 增加消费者的多线程处理
    • 部署增加多个消费者

    消息队列堆积,想办法把消息转移到一个新的队列,增加服务器慢慢来消费这个消息可以

    1、解决消费者消费异常问题

    2、解决消费者的性能瓶颈:改短休眠时间

    5.4小时。

    3、增加消费线程,增加多台服务器部署消费者。快速消费。

    增加10个线程

    concurrency="10" prefetch="10"
2. 惰性队列

从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的概念,也就是惰性队列。惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存
  • 支持数百万条的消息存储

基于命令行设置lazy-queue

而要设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可。可以通过命令行将一个运行中的队列修改为惰性队列:

sh
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues

命令解读:

  • rabbitmqctl :RabbitMQ的命令行工具
  • set_policy :添加一个策略
  • Lazy :策略名称,可以自定义
  • "^lazy-queue$" :用正则表达式匹配队列的名字
  • '{"queue-mode":"lazy"}' :设置队列模式为lazy模式
  • --apply-to queues :策略的作用对象,是所有的队列

基于bean

java
@Bean
public Queue lazyQueue(){
    return QueueBuilder.durable("lazy.queue") // 指定队列名称,并持久化
        .lazy()
        .build();
}

基于@RabbitListener

java
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "delayed.queue", durable = "true",arguments = @Argument(name = "x-queue-mode",value = "lazy")),
    exchange = @Exchange(name = "delayed.direct", delayed = "true"),
    key = "delayed"
))

3.5 工作模式

RabbitMQ提供了6种模式:简单模式,work模式,Publish/Subscribe发布与订阅模式,Routing路由模式,Topics主题模式,RPC远程调用模式

模式介绍

1. Work queues 工作队列模式

多个消费端共同消费同一个队列中的消息。适合应用于对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

2. Pub/Sub 订阅模式

在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:

  • P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
  • C:消费者,消息的接收者,会一直等待消息到来
  • Queue:消息队列,接收消息、缓存消息
  • Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、 递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
    • Fanout:广播,将消息交给所有绑定到交换机的队列
    • Direct:定向,把消息交给符合指定routing key 的队列
    • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合 路由规则的队列,那么消息会丢失!

到RabbitMQ的管理后台找到Exchanges选项卡,点击 fanout_exchange 的交换机,可以查看到绑定的信息。

3. Routing 路由模式
  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key)
  • 消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey
  • Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的 Routingkey 与消息的 Routing key 完全一致,才会接收到消息
  • P:生产者,向 Exchange 发送消息,发送消息时,会指定一个routing key
  • X:Exchange(交换机),接收生产者的消息,然后把消息递交给与 routing key 完全匹配的队列
  • C1:消费者,其所在队列指定了需要 routing key 为 error 的消息
  • C2:消费者,其所在队列指定了需要 routing key 为 info、error、warning 的消息

到RabbitMQ的管理后台找到Exchanges选项卡,点击 direct_exchange 的交换机,可以查看到绑定信息。

4. Topics 通配符模式
  • Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型 Exchange 可以让队列在绑定 Routing key 的时候使用通配符!
  • Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
  • 通配符规则:# 匹配一个或多个词,* 匹配不多不少恰好1个词,例如:item.# 能够匹配 item.insert.abc 或者 item.insert,item.* 只能匹配 item.insert

到RabbitMQ的管理后台找到Exchanges选项卡,点击 topic_exchange 的交换机,可以查看到绑定的信息

5. 总结
  1. 简单模式 HelloWorld 一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)。
  2. 工作队列模式 Work Queue 一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)。
  3. 发布订阅模式 Publish/subscribe 需要设置类型为 fanout 的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消 息发送到绑定的队列。
  4. 路由模式 Routing 需要设置类型为 direct 的交换机,交换机和队列进行绑定,并且指定 routing key,当发送消息到交换机 后,交换机会根据 routing key 将消息发送到对应的队列。
  5. 通配符模式 Topic 需要设置类型为 topic 的交换机,交换机和队列进行绑定,并且指定通配符方式的 routing key,当发送 消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列

3.6 消息追踪

当消息丢失时,需要可以跟踪记录消息的投递过程,可以使用Firehose和rabbitmq_tracing插件功能来实现消息追踪。

Firehose

firehose的机制是将生产者投递给rabbitmq的消息,rabbitmq投递给消费者的消息按照指定的格式 发送到默认的exchange上。这个默认的exchange的名称为amq.rabbitmq.trace,它是一个topic类 型的exchange。发送到这个exchange上的消息的routing key为 publish.exchangename 和 deliver.queuename。其中exchangename和queuename为实际exchange和queue的名称,分别 对应生产者投递到exchange的消息,和消费者从queue上获取的消息。 注意:打开 trace 会影响消息写入功能,适当打开后请关闭。

rabbitmqctl trace_on:开启Firehose命令

rabbitmqctl trace_off:关闭Firehose命令

rabbitmq_tracin

rabbitmq_tracing和Firehose在实现上如出一辙,只不过rabbitmq_tracing的方式比Firehose多了一 层GUI的包装,更容易使用和管理。 启用插件:rabbitmq-plugins enable rabbitmq_tracing

命令集描述
rabbitmq-plugins list查看插件列表
rabbitmq-plugins enable rabbitmq_tracingrabbitmq启用trace插件
rabbitmqctl trace_on打开trace的开关
rabbitmqctl trace_on -p itcast打开trace的开关(itcast为需要日志追踪的vhost)
rabbitmqctl trace_off关闭trace的开关
rabbitmq-plugins disable rabbitmq_tracingrabbitmq关闭Trace插件
rabbitmqctl set_user_tags heima administrator只有administrator的角色才能查看日志界面

安装插件并开启 trace_on 之后,会发现多个 exchange:amq.rabbitmq.trace ,类型为:topic。

发送消息后在管理页面的消息队列点击消息点击 admin 点击Tracing查看Trace log files中查看

3.7 消息丢失

消息丢失的场景主要分为:消息在生产者丢失,消息在RabbitMQ丢失,消息在消费者丢失。

1. 消息在生产者丢失

消息生产者发送消息成功,但是MQ没有收到该消息,消息在从生产者传输到MQ的过程中丢失,一般是由于网络不稳定的原因。

解决:

采用RabbitMQ 发送方消息确认机制,当消息成功被MQ接收到时,会给生产者发送一个确认消息,表示接收成功。RabbitMQ 发送方消息确认模式有以下三种:普通确认模式,批量确认模式,异步监听确认模式。spring整合RabbitMQ后只使用了异步监听确认模式。

异步监听模式,可以实现边发送消息边进行确认,不影响主线程任务执行。

2. 消息在RabbitMQ丢失

消息成功发送到MQ,消息还没被消费却在MQ中丢失,比如MQ服务器宕机或者重启会出现这种情况

持久化交换机,队列,消息,确保MQ服务器重启时依然能从磁盘恢复对应的交换机,队列和消息。

spring整合后默认开启了交换机,队列,消息的持久化,所以不修改任何设置就可以保证消息不在RabbitMQ丢失。但是为了以防万一,还是可以申明下。

3. 消息在消费者丢失

消息费者消费消息时,如果设置为自动回复MQ,消息者端收到消息后会自动回复MQ服务器,MQ则会删除该条消息,如果消息已经在MQ被删除但是消费者的业务处理出现异常或者消费者服务宕机,那么就会导致该消息没有处理成功从而导致该条消息丢失。

设置为手动回复MQ服务器,当消费者出现异常或者服务宕机时,MQ服务器不会删除该消息,而是会把消息重发给绑定该队列的消费者,如果该队列只绑定了一个消费者,那么该消息会一直保存在MQ服务器,直到消息者能正常消费为止。如:以一个队列绑定多个消费者为例来说明,一般在生产环境上也会让一个队列绑定多个消费者也就是工作队列模式来减轻压力,提高消息处理效率

MQ重发消息场景:

1.消费者未响应ACK,主动关闭频道或者连接

2.消费者未响应ACK,消费者服务挂掉

3.8 有序消费

场景1:

当RabbitMQ采用work Queue模式,此时只会有一个Queue但是会有多个Consumer,同时多个Consumer直接是竞争关系,此时就会出现MQ消息乱序的问题。

img

解决方案

img

场景2:

当RabbitMQ采用简单队列模式的时候,如果消费者采用多线程的方式来加速消息的处理,此时也会出现消息乱序的问题。

img

解决

img

3.9 重复消费

为了防止消息在消费者端丢失,会采用手动回复MQ的方式来解决,同时也引出了一个问题,消费者处理消息成功,手动回复MQ时由于网络不稳定,连接断开,导致MQ没有收到消费者回复的消息,那么该条消息还会保存在MQ的消息队列,由于MQ的消息重发机制,会重新把该条消息发给和该队列绑定的消息者处理,这样就会导致消息重复消费。而有些操作是不允许重复消费的,比如下单,减库存,扣款等操作。

MQ重发消息场景:

1.消费者未响应ACK,主动关闭频道或者连接

2.消费者未响应ACK,消费者服务挂掉

解决方案

如果消费消息的业务是幂等性操作(同一个操作执行多次,结果不变)就算重复消费也没问题,可以不做处理,如果不支持幂等性操作,如:下单,减库存,扣款等,那么可以在消费者端每次消费成功后将该条消息id保存到数据库,每次消费前查询该消息id,如果该条消息id已经存在那么表示已经消费过就不再消费否则就消费。本方案采用redis存储消息id,因为redis是单线程的,并且性能也非常好,提供了很多原子性的命令,本方案使用setnx命令存储消息id。

setnx(key,value):如果key不存在则插入成功且返回1,如果key存在,则不进行任何操作,返回0

3.10 WorkQueue

Work queues,也被称为(Task queues),任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息

当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。

此时就可以使用work 模型,多个消费者共同处理消息处理,速度就能大大提高了。

消息消费为平均分为每个消费者,当消费者效率不一致时,会造成性能差的处理较慢。可通过配置,使消费者一次只能接收一条消息

yaml
spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

3.11 消息补偿

image-20220524115409698

3.12 消息幂等性

image-20220524115450965

四、安装

4.1 常规安装

  • 安装Erlang

  • 设置远程ip登录

    shell
    #创建账号
    rabbitmqctl add_user oldlu 123456
    #设置用户角色
    rabbitmqctl set_user_tags oldlu administrator
    #设置用户权限
    rabbitmqctl set_permissions -p "/" oldlu ".*" ".*" ".*"
    #设置完成后可以查看当前用户和角色(需要开启服务)
    rabbitmqctl list_users
    # serverip:15672访问,serverip为RabitMQ-Server所在的主机ip

4.2 docker安装

shell
docker run \
 -e RABBITMQ_DEFAULT_USER=itcast \
 -e RABBITMQ_DEFAULT_PASS=123321 \
 -v mq-plugins:/plugins \
 --name mq \
 --hostname mq1 \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3.8-management

4.3 集群类型

RabbitMQ的是基于Erlang语言编写,而Erlang又是一个面向并发的语言,天然支持集群模式。RabbitMQ的集群有两种模式:

  • 普通集群:是一种分布式集群,将队列分散到集群的各个节点,从而提高整个集群的并发能力。

    普通模式集群不进行数据同步,每个MQ都有自己的队列、数据信息(其它元数据信息如交换机等会同步)。例如我们有2个MQ:mq1,和mq2,如果你的消息在mq1,而你连接到了mq2,那么mq2会去mq1拉取消息,然后返回给你。如果mq1宕机,消息就会丢失。

  • 镜像集群:是一种主从集群,普通集群的基础上,添加了主从备份功能,提高集群的数据可用性。

    会在各个mq的镜像节点之间同步,因此你连接到任何一个镜像节点,均可获取到消息。而且如果一个节点宕机,并不会导致数据丢失。不过,这种方式增加了数据同步的带宽消耗。

镜像集群虽然支持主从,但主从同步并不是强一致的,某些情况下可能有数据丢失的风险。因此在RabbitMQ的3.8版本以后,推出了新的功能:仲裁队列来代替镜像集群,底层采用Raft协议确保主从的数据一致性。

主备模式下主节点提供读写,从节点不提供读写服务,只是负责提供备份服务,备份节点的主要功能是在主节点宕机时,完成自动切换 从-->主,同时因为主备模式下始终只有一个对外提供服务那么对于高并发的情况下该模式并不合适.

远程模式可以让我们实现异地多活的mq,但是现在已经有了更好的异地多活解决方案,所以在实际的项目中已经不推荐使用了

镜像队列模式可以让我们的消息100%不丢失,同时可以结合HAProxy来实现高并发的业务场景所以在项目中使用得最多

4.4 普通集群安装

获取cookie

abbitMQ底层依赖于Erlang,而Erlang虚拟机就是一个面向分布式的语言,默认就支持集群模式。集群模式中的每个RabbitMQ 节点使用 cookie 来确定它们是否被允许相互通信。

要使两个节点能够通信,它们必须具有相同的共享秘密,称为Erlang cookie。cookie 只是一串最多 255 个字符的字母数字字符。

每个集群节点必须具有相同的 cookie。实例之间也需要它来相互通信。

shell
# 启动的mq容器中获取一个cookie值,作为集群的cookie
docker exec -it mq cat /var/lib/rabbitmq/.erlang.cookie
# 停止并删除当前的mq容器 重新搭建
docker rm -f mq

准备集群配置

shell
# 在/tmp下新建rabbitmq.conf
loopback_users.guest = false
listeners.tcp.default = 5672
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
cluster_formation.classic_config.nodes.1 = rabbit@mq1
cluster_formation.classic_config.nodes.2 = rabbit@mq2
cluster_formation.classic_config.nodes.3 = rabbit@mq3

# 记录cookie文件
cd /tmp
# 创建cookie文件
touch .erlang.cookie
# 写入cookie
echo "FXZMCVGLBIXZCDEMMVZQ" > .erlang.cookie
# 修改cookie文件的权限
chmod 600 .erlang.cookie

启动集群

shell
# 创建网络
docker network create mq-net
docker volume create 
# 第一个
docker run -d --net mq-net \
-v ${PWD}/mq1/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq1 \
--hostname mq1 \
-p 8071:5672 \
-p 8081:15672 \
rabbitmq:3.8-management

# 第二个
docker run -d --net mq-net \
-v ${PWD}/mq2/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq2 \
--hostname mq2 \
-p 8072:5672 \
-p 8082:15672 \
rabbitmq:3.8-management

# 第三个
docker run -d --net mq-net \
-v ${PWD}/mq3/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq3 \
--hostname mq3 \
-p 8073:5672 \
-p 8083:15672 \
rabbitmq:3.8-management

4.5 镜像集群

创建队列的节点被称为该队列的主节点,队列还会拷贝到集群中的其它节点,也叫做该队列的镜像节点。

但是,不同队列可以在集群中的任意节点上创建,因此不同队列的主节点可以不同。甚至,一个队列的主节点可能是另一个队列的镜像节点

用户发送给队列的一切请求,例如发送消息、消息回执默认都会在主节点完成,如果是从节点接收到请求,也会路由到主节点去完成。镜像节点仅仅起到备份数据作用

当主节点接收到消费者的ACK时,所有镜像都会删除节点中的数据。

总结如下:

  • 镜像队列结构是一主多从(从就是镜像)
  • 所有操作都是主节点完成,然后同步给镜像节点
  • 主宕机后,镜像节点会替代成新的主(如果在主从同步完成前,主就已经宕机,可能出现数据丢失)
  • 不具备负载均衡功能,因为所有操作都会有主节点完成(但是不同队列,其主节点可以不同,可以利用这个提高吞吐量)

配置模式

镜像模式的配置有3种模式:

ha-modeha-params效果
准确模式exactly队列的副本量count集群中队列副本(主服务器和镜像服务器之和)的数量。count如果为1意味着单个副本:即队列主节点。count值为2表示2个副本:1个队列主和1个队列镜像。换句话说:count = 镜像数量 + 1。如果群集中的节点数少于count,则该队列将镜像到所有节点。如果有集群总数大于count+1,并且包含镜像的节点出现故障,则将在另一个节点上创建一个新的镜像。
all(none)队列在群集中的所有节点之间进行镜像。队列将镜像到任何新加入的节点。镜像到所有节点将对所有群集节点施加额外的压力,包括网络I / O,磁盘I / O和磁盘空间使用情况。推荐使用exactly,设置副本数为(N / 2 +1)。
nodesnode names指定队列创建到哪些节点,如果指定的节点全部不存在,则会出现异常。如果指定的节点在集群中存在,但是暂时不可用,会创建节点到当前客户端连接到的节点。

以rabbitmqctl命令为案例,讲解语法

exactly模式

rabbitmqctl set_policy ha-two "^two\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
  • rabbitmqctl set_policy:固定写法
  • ha-two:策略名称,自定义
  • "^two\.":匹配队列的正则表达式,符合命名规则的队列才生效,这里是任何以two.开头的队列名称
  • '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}': 策略内容
    • "ha-mode":"exactly":策略模式,此处是exactly模式,指定副本数量
    • "ha-params":2:策略参数,这里是2,就是副本数量为2,1主1镜像
    • "ha-sync-mode":"automatic":同步策略,默认是manual,即新加入的镜像节点不会同步旧的消息。如果设置为automatic,则新加入的镜像节点会把主节点中所有消息都同步,会带来额外的网络开销

all模式

rabbitmqctl set_policy ha-all "^all\." '{"ha-mode":"all"}'
  • ha-all:策略名称,自定义
  • "^all\.":匹配所有以all.开头的队列名
  • '{"ha-mode":"all"}':策略内容
    • "ha-mode":"all":策略模式,此处是all模式,即所有节点都会称为镜像节点

nodes模式

rabbitmqctl set_policy ha-nodes "^nodes\." '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'
  • rabbitmqctl set_policy:固定写法
  • ha-nodes:策略名称,自定义
  • "^nodes\.":匹配队列的正则表达式,符合命名规则的队列才生效,这里是任何以nodes.开头的队列名称
  • '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}': 策略内容
    • "ha-mode":"nodes":策略模式,此处是nodes模式
    • "ha-params":["rabbit@mq1", "rabbit@mq2"]:策略参数,这里指定副本所在节点名称

4.6 仲裁队列

从RabbitMQ 3.8版本开始,引入了新的仲裁队列,他具备与镜像队里类似的功能。使用更加方便。

在控制台添加队列时队列类型选择Quorum类型。裁队列默认的镜像数为5。如果你的集群有7个节点,那么镜像数肯定是5;

java创建仲裁队列

java
@Bean
public Queue quorumQueue() {
    return QueueBuilder
        .durable("quorum.queue") // 持久化
        .quorum() // 仲裁队列
        .build();
}

4.7 集群扩容

加入集群

启动一个新的MQ容器:

sh
docker run -d --net mq-net \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq4 \
--hostname mq5 \
-p 8074:15672 \
-p 8084:15672 \
rabbitmq:3.8-management

2)进入容器控制台:

sh
docker exec -it mq4 bash

3)停止mq进程

sh
rabbitmqctl stop_app

4)重置RabbitMQ中的数据:

sh
rabbitmqctl reset

5)加入mq1:

sh
rabbitmqctl join_cluster rabbit@mq1

6)再次启动mq进程

sh
rabbitmqctl start_app

增加仲裁队列副本

我们先查看下quorum.queue这个队列目前的副本情况,进入mq1容器:

sh
docker exec -it mq1 bash

执行命令:

sh
rabbitmq-queues quorum_status "quorum.queue"

mq4加入

sh
rabbitmq-queues add_member "quorum.queue" "rabbit@mq4"

4.8 负载均衡-HAProxy

HAProxy提供高可用性、负载均衡以及基于TCP和HTTP应用的代理,支持虚拟主机,它是免费、快速并且可靠的一种解决方案,包括Twitter,Reddit,StackOverflow,GitHub在内的多家知名互联网公司在使用。HAProxy实现了一种事件驱动、单一进程模型,此模型支持非常大的并发连接数。

安装HAProxy

shell
//下载依赖包
yum install gcc vim wget
//上传haproxy源码包
//解压
tar -zxvf haproxy-1.6.5.tar.gz -C /usr/local
//进入目录、进行编译、安装
cd /usr/local/haproxy-1.6.5
make TARGET=linux31 PREFIX=/usr/local/haproxy
make install PREFIX=/usr/local/haproxy
mkdir /etc/haproxy
//赋权
groupadd -r -g 149 haproxy
useradd -g haproxy -r -s /sbin/nologin -u 149 haproxy
//创建haproxy配置文件
mkdir /etc/haproxy
vim /etc/haproxy/haproxy.cfg

配置HAProxy

配置文件路径:/etc/haproxy/haproxy.cfg

shell
#logging options
global
    log 127.0.0.1 local0 info
    maxconn 5120
    # ha的安装地址
    chroot /usr/local/haproxy
    uid 99
    gid 99
    daemon
    quiet
    nbproc 20
    pidfile /var/run/haproxy.pid

defaults
    log global
    #使用4层代理模式,”mode http”为7层代理模式
    mode tcp
    #if you set mode to tcp,then you nust change tcplog into httplog
    option tcplog
    option dontlognull
    retries 3
    option redispatch
    maxconn 2000
    contimeout 5s
     ##客户端空闲超时时间为 30秒 则HA 发起重连机制
     clitimeout 30s
     ##服务器端链接超时时间为 15秒 则HA 发起重连机制
     srvtimeout 15s 
#front-end IP for consumers and producters

listen rabbitmq_cluster
    bind 192.168.13.104:5672
    #配置TCP模式
    mode tcp
    #balance url_param userid
    #balance url_param session_id check_post 64
    #balance hdr(User-Agent)
    #balance hdr(host)
    #balance hdr(Host) use_domain_only
    #balance rdp-cookie
    #balance leastconn
    #balance source //ip
    #简单的轮询
    balance roundrobin
    #rabbitmq集群节点配置 #inter 每隔五秒对mq集群做健康检查, 2次正确证明服务器可用,2次失败证明服务器不可用,并且配置主备机制
        server server1 192.168.13.101:5672 check inter 5000 rise 2 fall 2
        server server2 192.168.13.102:5672 check inter 5000 rise 2 fall 2
        server server3 192.168.13.103:5672 check inter 5000 rise 2 fall 2
#配置haproxy web监控,查看统计信息
listen stats
    bind 192.168.13.104:8100 # 注意此处的ip地址,我们配置了2台机器
    mode http
    option httplog
    stats enable
    #设置haproxy监控地址为http://192.168.13.104:8100/rabbitmq-stats
    stats uri /rabbitmq-stats
    stats refresh 5s

启动HAproxy负载

shell
/usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg
//查看haproxy进程状态
ps -ef | grep haproxy

访问如下地址对mq节点进行监控
http://172.16.98.133:8100/rabbitmq-stats

代码中访问mq集群地址,则变为访问haproxy地址:5672

4.9 KeepAlived 搭建高可用的HAProxy集群

Keepalived,它是一个高性能的服务器高可用或热备解决方案,Keepalived主要来防止服务器单点故障的发生问题,可以通过其与Nginx、Haproxy等反向代理的负载均衡服务器配合实现web服务端的高可用。Keepalived以VRRP协议为实现基础,用VRRP协议来实现高可用性(HA).VRRP(Virtual Router Redundancy Protocol)协议是用于实现路由器冗余的协议,VRRP协议将两台或多台路由器设备虚拟成一个设备,对外提供虚拟路由器IP(一个或多个)。

1. 安装

sh
#安装所需软件包
yum install -y openssl openssl-devel
#下载安装包
wget http://www.keepalived.org/software/keepalived-1.2.18.tar.gz
#解压、编译、安装
tar -zxvf keepalived-1.2.18.tar.gz -C /usr/local/
cd /usr/local/keepalived-1.2.18/ && ./configure --prefix=/usr/local/keepalived
make && make install
# 将keepalived安装成Linux系统服务,因为没有使用keepalived的默认安装路径(默认路径:/usr/local),安装完成之后,需要做一些修改工作
#首先创建文件夹,将keepalived配置文件进行复制:
mkdir /etc/keepalived
cp /usr/local/keepalived/etc/keepalived/keepalived.conf /etc/keepalived/
#然后复制keepalived脚本文件:
cp /usr/local/keepalived/etc/rc.d/init.d/keepalived /etc/init.d/
cp /usr/local/keepalived/etc/sysconfig/keepalived /etc/sysconfig/
# 设置的过程中如果出现已经存在则删除原有的文件,重新创建
ln -s -f /usr/local/sbin/keepalived /usr/sbin/
ln -s -f /usr/local/keepalived/sbin/keepalived /sbin/
#可以设置开机启动:chkconfig keepalived on,到此我们安装完毕!
chkconfig keepalived on

2. 配置

vim /etc/keepalived/keepalived.conf
###############具体配置
! Configuration File for keepalived
global_defs {
   router_id server4  ##标识节点的字符串,通常为本机hostname
}
vrrp_script chk_haproxy {
    script "/etc/keepalived/haproxy_check.sh"  ##执行脚本位置
    interval 2  ##检测时间间隔
    weight -20  ##如果条件成立则权重减20
}
vrrp_instance VI_1 {
    state MASTER  ## 主节点为MASTER,备份节点为BACKUP-该配置非常重要
    interface ens33 ## 绑定虚拟IP的网络接口(网卡可以使用ifconfig查看)
    virtual_router_id 110  ## 虚拟路由ID号(主备节点一定要相同)-该配置非常重要
    mcast_src_ip 192.168.13.104 ## 本机ip地址
    priority 100  ##优先级配置(0-254的值),一般主节点的权重大于备份节点
    nopreempt
    advert_int 1  ## 组播信息发送间隔,俩个节点必须配置一致,默认1s
authentication {  ## 认证匹配
        auth_type PASS
        auth_pass itcast
    }
    track_script {
        chk_haproxy
    }
    virtual_ipaddress {
        192.168.13.110  ## 虚拟ip,可以指定多个,以后连接mq就使用该虚拟ip进行连接
    }
}

3. 编写执行脚本

shell
#添加文件位置为/etc/keepalived/haproxy_check.sh
#!/bin/bash
COUNT=`ps -C haproxy --no-header |wc -l`
if [ $COUNT -eq 0 ];then
    /usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg
    sleep 2
    if [ `ps -C haproxy --no-header |wc -l` -eq 0 ];then
        killall keepalived
    fi
fi

# 授权
chmod +x /etc/keepalived/haproxy_check.sh

4. 启动

sh
#启动两台机器的keepalived
service keepalived start | stop | status | restart
#查看状态
ps -ef | grep haproxy
ps -ef | grep keepalived

五、集群监控

1. 管理界面监控

管理界面监控需要我们开启对应的插件(rabbitmq-plugins enable rabbitmq_management)

然后访问http://ip:15672

2. 定制监控

RabbitMQ提供了很丰富的restful风格的api接口,我们可以通过这些接口得到对应的集群数据,此时我们就可以定制我们的监控系统。

HTTP API URLHTTP 请求类型接口含义
/api/connectionsGET获取当前RabbitMQ集群下所有打开的连接
/api/nodesGET获取当前RabbitMQ集群下所有节点实例的状态信息
/api/vhosts/{vhost}/connectionsGET获取某一个虚拟机主机下的所有打开的connection连接
/api/connections/{name}/channelsGET获取某一个连接下所有的管道信息
/api/vhosts/{vhost}/channelsGET获取某一个虚拟机主机下的管道信息
/api/consumers/GET获取某一个虚拟机主机下的所有消费者信息
/api/exchanges/GET获取某一个虚拟机主机下面的所有交换器信息
/api/queues/GET获取某一个虚拟机主机下的所有队列信息
/api/usersGET获取集群中所有的用户信息
/api/users/GET/PUT/DELETE获取/更新/删除指定用户信息
/api/users/{user}/permissionsGET获取当前指定用户的所有权限信息
/api/permissions/{vhost}/GET/PUT/DELETE获取/更新/删除指定虚拟主机下特定用户的权限
/api/exchanges/{vhost}/{name}/publishPOST在指定的虚拟机主机和交换器上发布一个消息
/api/queues/{vhost}/{name}/getPOST在指定虚拟机主机和队列名中获取消息,同时该动作会修改队列状态
/api/healthchecks/node/GET获取指定节点的健康检查状态

更多API的相关信息和描述可以访问http://ip:15672/api/

3. tracing日志监控

对于企业级的应用开发来讲,我们通常都会比较关注我们的消息,甚至很多的场景把消息的可靠性放在第一位,但是我们的MQ集群难免会出现消息异常丢失或者客户端无法发送消息等异常情况,此时为了帮助开发人员快速的定位问题,我们就可以对消息的投递和消费过程进行监控,而tracing日志监控插件帮我们很好的实现了该功能

4. Zabbix 监控RabbitMQ

Zabbix是一个基于WEB界面提供分布式系统监视以及网络监视功能的企业级开源解决方案,他也可以帮助我们搭建一个MQ集群的监控系统,同时提供预警等功能,官网